package com.niit.flume;

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.apache.log4j.Logger;
import org.slf4j.LoggerFactory;


public class MySink extends AbstractSink implements Configurable {
    private static final Logger LOG = (Logger) LoggerFactory.getLogger(MySink.class.getName());
    private String prefix;
    private String suffix;

    //循环的向channel拉取数据
    @Override
    public Status process() throws EventDeliveryException {
        //声明返回值的状态信息
        Status status;
        //获取当前Sink绑定的Channel
        Channel ch = getChannel();
        //获取事务
        Transaction txn = ch.getTransaction();
        //声明事件
        Event event;
        //开启事务
        txn.begin();

        //读取channel中的事件，知道读取到事件结束循环
        while (true){
            event = ch.take();
            if(event !=null){
                break;
            }
        }

        try{
            //信息打印
            LOG.info(prefix + new String(event.getBody()) + suffix);
            txn.commit();
            status = Status.READY;
        }catch (Exception e){
            //遇到异常，进行回滚
            txn.rollback();
            status = Status.BACKOFF;
        }finally {
            //关闭事务
            txn.close();
        }

        return status;
    }

    //读取配置文件
    @Override
    public void configure(Context context) {
        prefix = context.getString("prefix","BD");
        suffix = context.getString("suffix");
    }
}